package com.demo;

import com.demo.rocket.MsgBuilder;
import com.demo.rocket.RocketConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author root
 * @description TODO
 * @date 2021/3/26 14:59
 */
@RestController
@RequestMapping("rocket")
@Slf4j
public class ProducerController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @GetMapping("create/msg")
    public Object msg(String msg) {
        if (msg == null) {
            msg = "Hello World";
        }
        MessageHeaderAccessor accessor = new MessageHeaderAccessor();
        accessor.setHeader(MessageConst.PROPERTY_TAGS,"V");
        Message<String> message = MsgBuilder.msgBuilder(msg);

        //Message<String> message = MessageBuilder.withPayload(msg).build();
        // 主题地方，具体消息 注：必填
        rocketMQTemplate.send(RocketConfig.KEY,message);
        return msg;
    }
    @GetMapping("/custom")
    public Object tagMsg(String topic,String msg){
        if (msg == null) {
            msg = "test";
        }
        if (topic == null) {
            topic = RocketConfig.TEST;
        }
        rocketMQTemplate.send(topic,MsgBuilder.msgBuilder(msg));
        return msg;
    }



    @GetMapping("delay/msg")
    public Object delayMsg(String msg){
        msg = msg == null ? "Hello World" : msg;
        Message<String> message = MsgBuilder.msgBuilder(msg);
        message.getHeaders().put(MessageConst.PROPERTY_TAGS, "testTag");
        // 主题地方，具体消息 注：必填
        rocketMQTemplate.asyncSend(RocketConfig.KEY, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功: {}",sendResult.getSendStatus());
            }
            @Override
            public void onException(Throwable e) {
                log.info("发送失败: {}",e.getMessage());

            }
        },3000,3);
        return msg;
    }


    @GetMapping("transaction")
    public Object transaction(String msg){
        msg = msg == null ? "Hello transaction" : msg;
        MessageBuilder<String> header = MessageBuilder.withPayload(msg);
        Message<String> message = header.build();
        rocketMQTemplate.sendMessageInTransaction("test-transaction",message,null);
        return msg;
    }

    @RocketMQTransactionListener
    static class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            log.info("中间状态: {}",new String((byte[]) msg.getPayload()));
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            log.info("提交状态: {}",new String((byte[]) msg.getPayload()));
            return RocketMQLocalTransactionState.COMMIT;
        }
    }


}
